批流融合:Pulsar Flink Connector 2.7.0 发布
关于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/
前言
• 批流一体是数据计算的未来趋势,Pulsar Flink Connector 为 Apache Pulsar 和 Apache Flink 的批流一体提供了理想的解决方案。• Pulsar Flink Connector 2.7.0 支持 Pulsar 2.7.0 和 Flink 1.12 的版本,进一步实现了两者的特性融合。• 您可以轻松迁移到 Pulsar Flink Connector 2.7.0,并使用新版本的各种功能。• Pulsar Flink Connector 2.7.0 会直接贡献到 Flink 主仓库。
Pulsar Flink Connector 简介
随着数据日益膨胀,采用事件流处理数据至关重要。Apache Flink 将批流处理统一到计算引擎中。Apache Pulsar(与 Apache BookKeeper 一起)以 "流 "的方式统一数据。在 Pulsar 中,数据存储成一个副本,以流(streaming)(通过 pub-sub 接口)和 segment(用于批处理)的方式进行访问。 Pulsar 解决了企业在使用不同的存储和消息技术解决方案时遇到的数据孤岛问题。把 Flink 和 Pulsar 结合使用,这两种开源技术可以创建一个统一的数据架构,为实时数据驱动企业提供最佳解决方案。
Pulsar Flink Connector 基于 Apache Pulsar 和 Apache Flink 提供弹性数据处理,允许 Apache Flink 读写 Apache Pulsar 中的数据。使用 Pulsar Flink Connector,企业能够更专注于业务逻辑,无需关注存储问题。
面临的挑战
我们决定开发 Pulsar Flink Connector(连接器)时,Flink 和 Pulsar 社区都为之振奋,很多用户纷纷开始使用。借助 Pulsar Flink 连接器,惠普(HPE)构建了实时计算平台,BIGO 构建了实时消息处理系统,知乎的团队也在评估连接器是否满足其内部需求。
随着越来越多的用户在使用 Pulsar Flink Connector,我们从社区中听到一个共同的问题:数据序列化的设计不恰当。Pulsar 消息是结构化的,且有数据序列化能力(Schema)。旧版本 Pulsar Flink 连接器的序列化机制侧重于 Pulsar 的序列化能力,对 Flink 序列化机制支持的较差。因此用户必须进行大量的配置才能使用连接器进行实时计算。
为了让 Pulsar Flink 连接器更方便使用,我们决定构建完全支持 Flink 序列化机制的功能,节省用户的配置工作量。
Pulsar Flink Connector 2.7.0 新功能
Pulsar Flink Connector 2.7.0 支持 Apache Pulsar 2.7.0 和 Apache Flink 1.12 版本中的所有功能,完全兼容 Flink 连接器和 Flink 消息格式。现在,您可以使用 Flink 中的重要功能,如 exactly-once Sink、upsert Pulsar、数据定义语言(DDL)计算列、水印和元数据;还可以使用 Pulsar 中的 Key-Shared 订阅模式,保证消息的处理顺序,而无需太多的配置。此外,您还可以根据自己的业务轻松定制配置。
下面,我们详细介绍Pulsar Flink Connector 2.7.0 的主要功能。
高性能的有序消息队列
在有些场景下,用户需要消息严格保证消息顺序,才能保证业务处理正确。通常在消息严格保序的情况下,只能同时有一个消费者消费消息,才能保证顺序。这样会导致消息的吞吐量大幅度降低。Pulsar 为这样的场景设计了 Key-Shared 订阅模式,通过对消息增加 Key,将相同 Key Hash 的消息路由到同一个消息者上,这样既保证了消息的消费顺序,又提高了吞吐量。
我们在 Pulsar Flink 连接器中也添加了对该功能的支持。可以通过配置参数enable-key-hash-range=true
启用这个功能。开启功能后,会根据任务的并行度划分每个消费者处理的 Key Hash 范围。
Pulsar Sink 支持 exactly-once 语义
Pulsar 2.7 版本支持事务,极大增强了 Flink Sink 的容错能力。之前版本中,Sink 算子最多支持at-least-once 语义,这不能完全适配一些对 End-to-End 一致性要求比较高的场景,用户必须自己想办法维护去重的逻辑,这增加了用户编写代码的复杂程度。
在新版本的连接器中,得益于 Pulsar 强大的事务功能,我们设计了 exactly-once 的 sink 算子。Flink 框架早就已经对这种场景有了抽象地支持,其使用两阶段的提交协议来实现 Sink 算子的 Exactly-once 语义,该协议的主要生命周期方法包括 beginTransaction(), preCommit(), commit(), abort(), recoverAndCommit(), recoverAndAbort()。
我们的核心想法在于,用户在创建 sink 时可以自由选择算子的语义,内部的逻辑变化对用户代码是透明的。由于 Pulsar 的事务和 Flink 中两阶段提交协议的生命周期有诸多类似之处,我们可以用比较优雅地方式实现这项工作,而不要对 Pulsar 做过多侵入。
beginTransaction 以及 preCommit 的实现比较简单。只需要开启一个 Pulsar 事务,在 checkpoint 到来后持久化该事务的 TID。在 preCommit 阶段保证消息已经全部被 Flush 到 Pulsar 中,并且确保preCommit 成功的消息最终一定会被 Commit。
实现上需要思考的地方在于 recoverAndCommit 以及 recoverAndAbort 的情景。在 Kafka 连接器中,受限于 Kafka 的一些特性,使用了一些相对 hack 的方式来用于 recoverAndCommit 的工作。而 Pulsar 的事务由于不依赖于具体的 Producer,可以很容易的根据 TID 来接管事务的提交和撤回。
Pulsar 事务的灵活和高效与 Flink 优秀的架构抽象碰撞之后,Pulsar Flink 连接器具备了更大的潜力。同时,由于 Pulsar 的事务正处于活跃迭代状态,我们也会不断优化 Connector 中 Transactional Sink。
引入 upsert-pulsar 连接器
Flink 社区用户对 Upsert 模式消息队列有很高的需求,主要原因有三个:
• 将 Pulsar Topic 解释为一个 changelog 流,它将带有键的记录解释为 upsert 事件。 • 作为实时管道的一部分,将多个流连接起来进行充实,并将结果存储到 Pulsar Topic 中,以便以后进行进一步的计算。但结果可能包含更新事件。• 作为实时管道的一部分,聚合数据流并将结果存储到 Pulsar Topic 中,以便以后进行进一步计算。但结果可能包含更新事件。
基于这些需求,我们也实现了对 Upsert Pulsar 的支持。使用这个功能,用户能够以 upsert 的方式从 Pulsar 主题中读取数据和向 Pulsar 主题写入数据。
作为 source,upsert-pulsar 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,如果有相应的 key,数据记录中的 value 被解释为同一个 key 的最后一个 value 的 UPDATE;如果不存在相应的 key,则该更新被视为 INSERT。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何有相同 key 的已存在行都会被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
作为 sink,upsert-pulsar 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Pulsar 消息写入,并将 DELETE 数据以 value 为空的 Pulsar 消息写入(表示对应 key 的消息被删除)。Flink 根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
支持 FLIP-27[1]/FLIP 95[2] 中引入的新 Source 接口和新 Table 接口
该功能统一了批处理流的来源,优化了任务发现和数据读取的机制。这也是我们实现 Pulsar 批处理和流统一的基石。另外新的 Table API 优化了设计,增加了 DDL 计算列、元数据定义等特性。
支持 FLIP-107[3] SQL 描述符 metadata
FLIP-107 使用户能够在表定义中以元数据列的形式访问连接器元数据。用户在实时计算场景下,通常会有读写消息正文外的一些拓展信息,比如 eventTime、自定义字段等。Pulsar Flink 连接器在支持 metadata 描述符后,用户可以灵活方便地读写 Pulsar 消息的元数据。
增加 Flink 格式类型 atomic
来支持 Pulsar 原生类型 当 Flink 需要处理 Pulsar 原生类型[4]时,可以使用 atomic
作为连接器格式。
迁移
如果想从 Pulsar Flink Connector 之前的版本迁移到 2.7.0,需要对 SQL 和 API 参数做相应调整。下面我们提供了每个参数的详细信息。
SQL
在 SQL 中,DDL 声明中 Pulsar 的配置参数发生了一些变化,主要是参数名发生了变化,值未发生改变。
• 参数名中的 connector.
前缀全部被移除。•connector.type
参数名改为 connector
。• 启动模式参数名由 connector.startup-mode
改为 scan.startup.mode
。• Pulsar properties 设置调整为更直观的properties.pulsar.reader.readername=testReaderName
。
如果您在 Pulsar Flink Connector 中使用了 SQL,在迁移到 Pulsar Flink Connector 2.7.0 时需要相应调整 SQL 配置。以下为 SQL 配置调整示例。
旧版本SQL:
create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client_ip` VARCHAR,
`day`as TO_DATE(rtime),
`hour`as date_format(rtime,'HH')
)with(
'connector.type'='pulsar',
'connector.version'='1',
'connector.topic'='persistent://public/default/test_flink_sql',
'connector.service-url'='pulsar://xxx',
'connector.admin-url'='http://xxx',
'connector.startup-mode'='earliest',
'connector.properties.0.key'='pulsar.reader.readerName',
'connector.properties.0.value'='testReaderName',
'format.type'='json',
'update-mode'='append'
);
变更后的新版本 SQL:
create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client_ip` VARCHAR,
`day`as TO_DATE(rtime),
`hour`as date_format(rtime,'HH')
)with(
'connector'='pulsar',
'topic'='persistent://public/default/test_flink_sql',
'service-url'='pulsar://xxx',
'admin-url'='http://xxx',
'scan.startup.mode'='earliest',
'properties.pulsar.reader.readername'='testReaderName',
'format'='json');
API
从 API 的角度来看,我们调整了 一些类,更方便定制化。
• 为了解决序列化问题,修改了 FlinkPulsarSink
构造方法的签名,增加PulsarSerializationSchema
。• 删除与 Row 相关的不合适的类,如 FlinkPulsarRowSink
、FlinkPulsarRowSource
。如果您需要处理 Row 格式,可以使用 Flink Row 相关的序列化组件。
PulsarSerializationSchema
可以使用 PulsarSerializationSchemaWrapper.Builder
去创建。TopicKeyExtractor 的功能移动到了 PulsarSerializationSchemaWrapper,参考代码如下
newPulsarSerializationSchemaWrapper.Builder<>(newSimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();
未来计划
我们基于新的 Flink Source API(FLIP-27[5]),设计了一个集成 Pulsar Source 的批处理和流处理方案。Pulsar 采用分层架构,将数据分为流、批、冷三部分,从而实现无限容量。这让 Pulsar 成为统一批处理和流处理的理想解决方案。
基于 Flink Source API 的解决方案可以简单分为两个部分:SplitEnumerator 和 Reader。SplitEnumerator 发现并分配分区,Reader 从分区中读取数据。
Pulsar 将消息存储在 leader 块中,可以通过 Pulsar admin 获取 leader 信息,然后通过不同的分区策略提供 broker 分区、BookKeeper 分区、Offloader 分区等信息。更多详情可参考 issue #187[6]。
总结
Pulsar Flink Connector 2.7.0 已经发布,欢迎大家试用。新版本易用性更好,并支持 Pulsar 2.7 和 Flink 1.12 中的各种功能。我们把 Pulsar Flink Connector 2.7.0 贡献到 Flink 主仓库。如果你有任何疑问,欢迎提 issue[7]。
相关阅读
•如何使用 Apache Flink 查询 Pulsar 流•揭开 Pulsar Flink connector 的非神秘面纱•Pulsar Flink Connector 2.5.0 正式发布
引用链接
[1]
FLIP-27: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification[2]
FLIP 95: https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces[3]
FLIP-107: https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors[4]
Pulsar 原生类型: https://pulsar.apache.org/docs/en/schema-understand/[5]
FLIP-27: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-BatchandStreamingUnification[6]
issue #187: https://github.com/streamnative/pulsar-flink/issues/187[7]
提 issue: https://github.com/streamnative/pulsar-flink/issues
点击「阅读原文」,下载试用吧